Defining DataFrame Schemas with StructField and StructType

Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField’s that defines column name, column data type, boolean to specify if the field can be nullable or not and metadata.  This method is used when you cannot define case classes ahead of time; for example, when the records structure is encoded in a text dataset or a string.

To create a case class using programmatic approach the following steps can be used:
  • Use the existing RDD to create an RDD of rows.
  • Create the schema represented by a StructType which matches the rows structure.
  • Apply the schema to the RDD of rows using the createDataFrame method.

structType-- Defines the structure of the Dataframe
Spark provides spark.sql.types.StructType class to define the structure of the DataFrame and It is a collection or list on StructField objects. By calling printSchema() method on the DataFrame, StructType columns are represents as “struct”.
structField-- Defines the metadata of the DataFrame columns
Spark provides spark.sql.types.StructField class to define the column name(String), column type (DataType), nullable column (Boolean) and metadata (MetaData)

Example
val df=sc.textFile("/FileStore/tables/people.txt")
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema=StructType(   Array
                (StructField("name", StringType, true),
                 StructField("age", IntegerType, true)))
val rdd = df.map(x=>x.split(",")).map(e => Row(e(0),e(1).trim.toInt))
val empdf  = spark.createDataFrame(rdd, schema)
empdf.show
 
Defining Schema operator with ::
 
val df=sc.textFile("/FileStore/tables/people.txt")
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema=StructType(
                      StructField("name",StringType,true)::
                      StructField("age",IntegerType,true)::Nil)
val rdd = df.map(x=>x.split(",")).map(e => Row(e(0),e(1).toInt))
val empdf  = spark.createDataFrame(rdd, schema)
empdf.show 

How to Create Empty DataFrame
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
val schema = StructType(
                      StructField("Name", StringType, true) ::
                      StructField("Age", IntegerType, false) :: Nil)
val df=spark.createDataFrame(sc.emptyRDD[Row], schema)
df.show

import spark.implicits._
val emptyDf=Seq.empty[(String, Int)].toDF("name", "age")
emptyDf.show

import spark.implicits._
case class Person(id: Int, name: String)
val df=spark.emptyDataset[Person].toDF
df.show


Defining nested StructType object struct
 val structureData = Seq(
    Row(Row("James ","","Smith"),"36636","M",3100),
    Row(Row("Michael ","Rose",""),"40288","M",4300),
    Row(Row("Robert ","","Williams"),"42114","M",1400),
    Row(Row("Maria ","Anne","Jones"),"39192","F",5500),
    Row(Row("Jen","Mary","Brown"),"","F",-1)
  )


val structureSchema = new StructType()
                                     .add("name",new StructType()
                                     .add("firstname",StringType)
                                     .add("middlename",StringType)
                                     .add("lastname",StringType))
                                     .add("id",StringType)
                                     .add("gender",StringType)
                                     .add("salary",IntegerType)

structureSchema: org.apache.spark.sql.types.StructType
=      StructType(StructField(name,StructType(StructField(firstname,StringType,true), StructField(middlename,StringType,true),
StructField(lastname,StringType,true)),true),
StructField(id,StringType,true),
StructField(gender,StringType,true),
StructField(salary,IntegerType,true))

  val df2 = spark.createDataFrame(
     spark.sparkContext.parallelize(structureData),structureSchema)
  df2.printSchema()

No comments:

Post a Comment